 1.Spark GraphX之Spark GraphX计算
   
   图的定义
   属性操作
   转换操作
   结构操作
   关联操作
   聚合操作
   Pregel API
   引入依赖：
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-graphx_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>
   1).案例一：图的基本操作
package cn.lagou.graphx

import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object GraphXExample1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().
      setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    // 定义顶点
    val vertexArray: Array[(VertexId, (String, Int))] = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50))
    )
    val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(vertexArray)

    // 定义边
    val edgeArray: Array[Edge[Int]] = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 6),
      Edge(4L, 1L, 1),
      Edge(5L, 2L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3)
    )
    val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)

    // 图的定义
    val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

    // 属性操作(找出图中年龄 > 30的顶点：属性 > 5的边；属性 > 5的triplets)
    graph.vertices
      .filter{case (_, (_, age)) => age > 30}
      .foreach(println)

    graph.edges
      .filter{edge => edge.attr > 5}
      .foreach(println)

    graph.triplets
      .filter{t => t.attr > 5}
      .foreach(println)

    // 属性操作：degress操作，找出图中最大的出度、入度、度数
    val inDegress: (VertexId, Int) = graph.inDegrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(s"inDegress = $inDegress")

    val outDegress: (VertexId, Int) = graph.outDegrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(s"outDegress = $outDegress")

    val degress: (VertexId, Int) = graph.degrees
      .reduce((x, y) => if (x._2 > y._2) x else y)
    println(s"degress = $degress")


    // 转换操作。顶点转换，所有人年龄加1
    graph.mapVertices{case (id, (name, age)) => (id, (name, age+100))}
      .vertices
      .foreach(println)

    // 边的转换，边的属性*2
    graph.mapEdges(e => e.attr * 2)
      .edges
      .foreach(println)
    // 结构操作。顶点年龄 > 30 的子图
    val subGraph: Graph[(String, Int), Int] =
      graph.subgraph(vpred = (id, vd) => vd._2 > 30)
    subGraph.edges.foreach(println)
    subGraph.vertices.foreach(println)

    // 找出出度=入度的人员
    // 思路：图 + 顶点的出度 + 顶点的入度 => 连接操作
    val initailUserGraph: Graph[User, Int] =
      graph.mapVertices { case (_, (name, age)) => User(name, age, 0, 0) }
  
    val userGraph: Graph[User, Int] =
      initailUserGraph.outerJoinVertices(initailUserGraph.inDegrees) {
        case (_, u, inDeg) => 
          User(u.name, u.age, inDeg.getOrElse(0), u.outDegress
          )
      }.outerJoinVertices(initailUserGraph.outDegrees) {
        case (_, u, outDeg) => 
          User(u.name, u.age, u.inDegress, outDeg.getOrElse(0)
          )
      }
    
    //    userGraph.vertices.foreach(println)
    userGraph.vertices.filter { case (_, user) => 
      user.inDegress == user.outDegress 
    }
      .foreach(println)

    // 顶点5到其他顶点的最短距离。聚合操作(Pregel API)
    val sourceId: VertexId = 5L // 定义源点
    val initailGraph: Graph[Double, Int] =
      graph.mapVertices((id, _) => if (id == sourceId) 
        0.0 else Double.PositiveInfinity)
    
    val disGraph: Graph[Double, Int] =
      initailGraph.pregel(Double.PositiveInfinity)(
        // 两个消息来的时候，取其中最小的路径
        (_, dist, newDist) => math.min(dist, newDist),
        // Send Message 函数
        triplet => {
          if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
            Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
          } else {
            Iterator.empty
          }
        },

        // mergeMsg
        (dista, distb) => math.min(dista, distb)
      )

    disGraph.vertices.foreach(println)

    sc.stop()
  }
}

case class User(name: String, age: Int, inDegress: Int, outDegress: Int)   
   Pregel API
   图本身是递归数据结构，顶点的属性依赖于它们邻居的属性，这些邻居的属性又依赖
于自己邻居的属性。
   所以许多重要的图算法都是迭代的重新计算每个顶点的属性，直到满足某个确定的条
件。
   一系列的图并发抽象被提出来用来表达这些迭代算法。
   GraphX公开了一个类似Pregel的操作。
      vprog：用户定义的顶点运行程序。它作用于每一个顶点，负责接收进来的信息,
并计算新的顶点值
      sendMsg：发送消息
      mergeMsg：合并消息
   2).案例二：连通图算法
   给定数据文件，找到存在的连通体
package cn.lagou.graphx

import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}

object GraphXExample2 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().
      setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    // 生成图
    val graph: Graph[Int, Int] =
      GraphLoader.edgeListFile(sc, "data/graph.dat")

    graph.vertices.foreach(println)
    graph.edges.foreach(println)
    // 调用连通图算法
    graph.connectedComponents()
      .vertices
      .sortBy(_._2)
      .foreach(println)

    sc.stop()
  }
}
   
   3).案例三：寻找相同的用户，合并信息
   假设：
        假设有五个不同信息可以作为用户标识，分别为：1X、2X、3X、4X、5X；
        每次可以选择使用若干为字段作为标识
        部分标识可能发生变化，如：12 => 13 或 24 => 25
    根据以上规则，判断以下标识是否代表同一用户：
	    11-21-32、12-22-33 (X)
        11-21-32、11-21-52 (OK)
        21-32、11-21-33 (OK)
		11-21-32、32-48 (OK)
	问题：在以下数据中，找到同一用户，合并相同用户的数据
	    对于用户标识(id)：合并后去重
        对于用户的信息：key相同，合并权重
List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中关村" -> 1.0)
List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龙观" -> 1.0)
List(41L), List("kw$天津" -> 1.0, "area$中关村" -> 1.0)

List(12L, 22L, 33L), List("kw$大数据" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)
List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)
List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)

package cn.lagou.graphx

import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object GraphXExample3 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getCanonicalName)
      .setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    // 原始数据集
    val lst: List[(List[Long], List[(String, Double)])] = List(
      (List(11L, 21L, 31L), List("kw$北京" -> 1.0, "kw$上海" -> 1.0, "area$中关村" -> 1.0)),
      (List(21L, 32L, 41L), List("kw$上海" -> 1.0, "kw$天津" -> 1.0, "area$回龙观" -> 1.0)),
      (List(41L), List("kw$天津" -> 1.0, "area$中关村" -> 1.0)),
      (List(12L, 22L, 33L), List("kw$大数据" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0)),
      (List(22L, 34L, 44L), List("kw$spark" -> 1.0, "area$五道口" -> 1.0)),
      (List(33L, 53L), List("kw$hive" -> 1.0, "kw$spark" -> 1.0, "area$西二旗" -> 1.0))
    )
    val rawRDD: RDD[(List[Long], List[(String, Double)])] = sc.makeRDD(lst)




    // 创建边。RDD[Edge(long, Long, T1)]
    // List(11L, 21L, 31L), A1 => 11 -> 112131, 21 -> 112131, -> 31 ->112131
    val dotRDD: RDD[(Long, Long)] = rawRDD.flatMap { case (ids, _) =>
      ids.map(id => (id, ids.mkString.toLong))
    }
    //dotRDD.foreach(println)
    val edgesRDD: RDD[Edge[Int]] = dotRDD.map { case (id, ids) => Edge(id, ids, 0) }

    // 创建顶点。RDD顶点[(long, T1)]
    val vertexesRDD: RDD[(Long, String)] = dotRDD.map { case (id, _) => (id, "") }

    // 生成图
    val graph: Graph[String, Int] = Graph(vertexesRDD, edgesRDD)

    // 调用强连通体算法. 识别6条数据，代表2个不同的用户
    val connectedRDD: VertexRDD[VertexId] = graph.connectedComponents()
      .vertices
//    connectedRDD.foreach(println)

    // 定义中心的数据
    val centerVertexRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] =
      rawRDD.map { case (ids, info) => (ids.mkString.toLong, (ids, info)) }
//    centerVertexRDD.foreach(println)

    // join操作，拿到分组的标记
    val dataRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] =
      connectedRDD.join(centerVertexRDD)
        .map { case (_, (v1, v2)) => (v1, v2) }

    // 数据聚合、合并
    val reslutRDD: RDD[(VertexId, (List[VertexId], List[(String, Double)]))] =
      dataRDD.reduceByKey { case ((bufIds, bufInfo), (ids, info)) =>

         // 数据聚合
         val newIds: List[VertexId] = bufIds ++ ids
         val newInfo: List[(String, Double)] = bufInfo ++ info

         // 对用户id做去重，对标签做合并
         (newIds.distinct, newInfo.groupBy(_._1).mapValues(lst =>
           lst.map(_._2).sum).toList)
    }

    reslutRDD.foreach(println)

    sc.stop()
  }

}
